-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Query Session Pool redesign #476
Conversation
🌋 Here are results of SLO test for Python SDK over Table Service: |
🌋 Here are results of SLO test for Python SDK over Query Service: |
ff838aa
to
4a5c58e
Compare
3526fc2
to
c087ffb
Compare
pool._size = target_size | ||
ids = set() | ||
|
||
for i in range(1, target_size + 1): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not
for i in range (target_size)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just for better readability here
assert pool._current_size == i
I think it does not matter where to drop a +1
so it was a coin flip
ids.add(session._state.session_id) | ||
|
||
with pytest.raises(asyncio.TimeoutError): | ||
await asyncio.wait_for(pool.acquire(), timeout=0.5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may be whait 0.1 or 0.01 second?
await pool.release(session) | ||
|
||
session = await pool.acquire() | ||
assert pool._current_size == target_size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about save session id before release and compare aquired session id with exactly saved id instead of one of
?
docker_project.stop() | ||
try: | ||
await asyncio.wait_for(pool.acquire(), timeout=0.5) | ||
except ydb.Error: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why ydb.Error instead of TimeoutError?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we will have an error from driver after ydb scale down
ydb/aio/query/pool.py
Outdated
done, _ = await asyncio.wait((queue_get, task_stop), return_when=asyncio.FIRST_COMPLETED) | ||
if task_stop in done: | ||
queue_get.cancel() | ||
return await self._create_new_session() # TODO: not sure why |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What mean the comment?
|
||
await asyncio.gather(*tasks) | ||
|
||
logger.debug("All session were deleted.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about add __del__
for stop the pool? may be hard stop: without wait anything and with a log message?
fot prevent leaks
ydb/aio/query/pool.py
Outdated
@@ -96,13 +158,17 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): | |||
|
|||
|
|||
class SimpleQuerySessionCheckoutAsync: | |||
def __init__(self, pool: QuerySessionPoolAsync): | |||
def __init__(self, pool: QuerySessionPoolAsync, timeout: Optional[float] = None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need timeout in async?
ydb/query/pool.py
Outdated
start = time.monotonic() | ||
if session is None and self._current_size == self._size: | ||
try: | ||
_, session = self._queue.get(block=True, timeout=timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is long operation under mutex lock.
How you will enforce smaller timeout on parallel request?
9489f7a
to
1d26890
Compare
ydb/aio/query/pool.py
Outdated
done, _ = await asyncio.wait((queue_get, task_stop), return_when=asyncio.FIRST_COMPLETED) | ||
if task_stop in done: | ||
queue_get.cancel() | ||
return await self._create_new_session() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why it create a session on stop pool?
may be runtime error - similar to acquire on stopped pool?
ydb/aio/query/pool.py
Outdated
if self._should_stop.is_set() or self._loop.is_closed(): | ||
return | ||
|
||
self._loop.call_soon(functools.partial(self.stop)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need "partial"?
3f425bf
to
ef7e50e
Compare
ef7e50e
to
4bb2566
Compare
Pull request type
Please check the type of change your PR introduces:
What is the current behavior?
Issue Number: N/A
What is the new behavior?
Other information